package com.xzx.flink.streamapi.sink;

import com.xzx.flink.bean.ClickEvent;
import com.xzx.flink.streamapi.source.ClickSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author xinzhixuan
 * @version 1.0
 * @date 2021-08-29 10:41 下午
 */
public class Sink_02_KafkaV2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<ClickEvent> streamSource = env.addSource(new ClickSource());

        String brokers = "172.25.48.130:29092";
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("fink-test")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .setKeySerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        streamSource.map(ClickEvent::toString).sinkTo(sink);
        env.execute(Sink_02_KafkaV2.class.getName());
    }
}
